package com.sisyphus.loginfail_detect

import java.util

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

/**
 * 风控1：恶意登录监控
 * 基本需求
 * - 用户在短时间内频繁登录失败，有程序恶意攻击的可能
 * - 同一用户（可以是不同IP）在2秒内连续登录失败，需要报警
 * 解决思路
 * - 将用户的登录失败行为存入ListState,设定定时器2秒后触发，查看ListState中有几次失败登录
 * - 更加精确的检测，可以使用CEP库实现事件流的模式匹配
 */
object LoginFailWithCep {
  def main(args: Array[String]): Unit = {
    // 1. env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 2. source
    // 读取事件数据,创建简单事件流
    val resource = getClass.getResource("/Loginlog.csv")
    val dataStream = env.readTextFile(resource.getPath)

    // 3. transformation
    val loginEventStream = dataStream.map(data => {
      val dataArray = data.split(",")
      LoginEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
        override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L
      })
      .keyBy(_.userId)

    // 定义匹配模式
    val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail")
      .next("next").where(_.eventType == "fail")
      .within(Time.seconds(2))

    // 在事件流上应用模式，得到一个pattern stream
    val patternStream = CEP.pattern(loginEventStream, loginFailPattern)

    // 从pattern stream上应用select function,检出匹配事件序列
    val loginFailDataStream = patternStream.select(new LoginFailMatch())

    // 4. sink
    loginFailDataStream.print()

    // 5. execute
    env.execute("login fail with cep job")
  }
}

class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning] {
  override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
    // 从map中按照名称取出对应的事件
    val firstFail = map.get("begin").iterator().next()
    val lastFail = map.get("next").iterator().next()
    Warning(firstFail.userId, firstFail.eventTime, lastFail.eventTime, "login fail!")
  }
}
